3.6 Streaming
本节介绍 LangChain 中的流式输出机制。
为什么需要 Streaming?
"Streaming is crucial for enhancing the responsiveness of applications built on LLMs."
流式输出对于提升 LLM 应用的响应性至关重要:
- 更好的用户体验 - 实时看到输出,无需等待完整响应
- 进度可见 - 了解 Agent 正在执行什么操作
- 快速反馈 - 发现问题可以及早中断
四种流式模式
LangChain 提供四种流式输出模式:
1. Agent 进度 (stream_mode="updates")
每个 Agent 步骤后发送事件:
python
from langchain.agents import create_agent
agent = create_agent("gpt-4o", tools=[get_weather, search])
for event in agent.stream(
{"messages": [{"role": "user", "content": "北京天气怎么样?"}]},
stream_mode="updates"
):
print(event)输出示例:
python
# 事件 1: LLM 决定调用工具
{"llm": {"tool_calls": [{"name": "get_weather", "args": {"city": "北京"}}]}}
# 事件 2: 工具执行结果
{"tools": {"get_weather": "北京:晴,25度"}}
# 事件 3: LLM 生成最终回复
{"llm": {"content": "北京今天天气晴朗,温度25度,非常适合外出。"}}2. LLM Token (stream_mode="messages")
逐 token 流式输出:
python
for event in agent.stream(
{"messages": [{"role": "user", "content": "讲个故事"}]},
stream_mode="messages"
):
if hasattr(event, "content"):
print(event.content, end="", flush=True)输出:
从|前|有|一|座|山|,|山|里|有|座|庙|...3. 自定义更新 (stream_mode="custom")
在工具中发送自定义信号:
python
from langchain.agents import get_stream_writer, ToolRuntime
from langchain_core.tools import tool
@tool
def analyze_data(data: str, runtime: ToolRuntime) -> str:
"""分析数据"""
writer = get_stream_writer()
writer("正在加载数据...")
# 处理步骤 1
writer("正在分析数据...")
# 处理步骤 2
writer("正在生成报告...")
# 处理步骤 3
return "分析完成"
# 接收自定义更新
for event in agent.stream(
{"messages": [...]},
stream_mode="custom"
):
print(f"进度: {event}")输出:
进度: 正在加载数据...
进度: 正在分析数据...
进度: 正在生成报告...4. 组合模式
同时使用多种流式模式:
python
for event in agent.stream(
{"messages": [{"role": "user", "content": "分析数据并总结"}]},
stream_mode=["updates", "custom"]
):
if "custom" in event:
print(f"[进度] {event['custom']}")
elif "llm" in event:
print(f"[LLM] {event['llm']}")
elif "tools" in event:
print(f"[工具] {event['tools']}")模型级流式输出
直接对模型使用流式输出:
python
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-4o")
# 同步流式
for chunk in model.stream("讲一个关于 AI 的故事"):
print(chunk.content, end="", flush=True)异步流式
python
async def stream_response():
model = ChatOpenAI(model="gpt-4o")
async for chunk in model.astream("讲一个故事"):
print(chunk.content, end="", flush=True)
import asyncio
asyncio.run(stream_response())处理流式工具调用
工具调用也可以流式输出:
python
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-4o")
model_with_tools = model.bind_tools([get_weather])
for chunk in model_with_tools.stream("北京和上海天气怎么样?"):
# 检查是否有工具调用
if chunk.tool_call_chunks:
for tool_chunk in chunk.tool_call_chunks:
print(f"工具: {tool_chunk.get('name', '')}")
print(f"参数片段: {tool_chunk.get('args', '')}")
# 检查是否有文本内容
if chunk.content:
print(chunk.content, end="")禁用流式输出
在特定场景下禁用流式:
python
from langchain_openai import ChatOpenAI
# 创建模型时禁用
model = ChatOpenAI(model="gpt-4o", streaming=False)
# 或在调用时禁用
response = model.invoke("你好", config={"streaming": False})多 Agent 系统中选择性禁用:
python
# 只让主 Agent 流式输出,子 Agent 不流式
main_agent = create_agent("gpt-4o", tools=[...], streaming=True)
sub_agent = create_agent("gpt-3.5-turbo", tools=[...], streaming=False)完整流式示例
python
from langchain.agents import create_agent, get_stream_writer, ToolRuntime
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
@tool
def process_file(filename: str, runtime: ToolRuntime) -> str:
"""处理文件"""
writer = get_stream_writer()
writer(f"正在读取文件: {filename}")
# 模拟处理
import time
time.sleep(1)
writer("正在解析内容...")
time.sleep(1)
writer("处理完成!")
return f"文件 {filename} 处理完成,共 100 行"
# 创建 Agent
agent = create_agent(
ChatOpenAI(model="gpt-4o"),
tools=[process_file],
system_prompt="你是文件处理助手"
)
# 流式执行
print("开始处理...\n")
for event in agent.stream(
{"messages": [{"role": "user", "content": "处理 data.csv 文件"}]},
stream_mode=["updates", "custom", "messages"]
):
# 自定义进度
if "custom" in event:
print(f"[进度] {event['custom']}")
# Agent 步骤更新
elif "updates" in event:
update = event["updates"]
if "llm" in update:
print(f"[思考] {update['llm'].get('content', '')[:50]}...")
elif "tools" in update:
print(f"[工具结果] {update['tools']}")
# Token 流
elif "messages" in event:
msg = event["messages"]
if hasattr(msg, "content") and msg.content:
print(msg.content, end="", flush=True)
print("\n\n处理结束!")流式事件结构
updates 模式事件
python
{
"llm": {
"content": "...",
"tool_calls": [...],
"response_metadata": {...}
}
}
# 或
{
"tools": {
"tool_name": "工具返回结果"
}
}messages 模式事件
python
AIMessageChunk(
content="token内容",
id="msg_xxx",
response_metadata={...}
)custom 模式事件
python
"自定义消息字符串"
# 或任何可序列化对象最佳实践
| 场景 | 推荐模式 |
|---|---|
| 聊天应用 | messages - 逐字显示回复 |
| 复杂任务 | updates - 显示执行步骤 |
| 长时间处理 | custom - 显示进度更新 |
| 调试开发 | ["updates", "messages"] - 完整信息 |
注意事项
- 网络开销 - 流式会增加网络请求次数
- 错误处理 - 流式过程中的错误需要特别处理
- 取消支持 - 考虑支持用户中断长时间流式
- 缓冲区 - 某些环境需要刷新缓冲区才能实时显示